package com.ruyuan.eshop.promotion.mq.consumer.listener;

import com.alibaba.fastjson.JSON;
import com.ruyuan.eshop.common.constants.RedisKey;
import com.ruyuan.eshop.common.constants.RocketMqConstant;
import com.ruyuan.eshop.common.core.JsonResult;
import com.ruyuan.eshop.common.exception.BaseBizException;
import com.ruyuan.eshop.common.message.PlatformPromotionUserBucketMessage;
import com.ruyuan.eshop.common.utils.JsonUtil;
import com.ruyuan.eshop.membership.api.AccountApi;
import com.ruyuan.eshop.membership.domain.dto.MembershipAccountDTO;
import com.ruyuan.eshop.membership.mq.event.UserLoginedEvent;
import com.ruyuan.eshop.promotion.dao.SalesPromotionCouponItemDAO;
import com.ruyuan.eshop.promotion.domain.entity.SalesPromotionCouponDO;
import com.ruyuan.eshop.promotion.domain.entity.SalesPromotionCouponItemDO;
import com.ruyuan.eshop.promotion.domain.entity.SalesPromotionDO;
import com.ruyuan.eshop.promotion.mq.event.SalesPromotionCreatedEvent;
import com.ruyuan.eshop.promotion.mq.producer.DefaultProducer;
import com.ruyuan.eshop.promotion.redis.RedisCache;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboReference;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 用户登录事件监听器
 */
@Slf4j
@Component
public class UserLoginedEventListener implements MessageListenerConcurrently {

    @Autowired
    private RedisCache redisCache;
    @Autowired
    private SalesPromotionCouponItemDAO couponItemDAO;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
            // 从redis缓存里查询所有的优惠券
            log.info("开始消费用户登录自动领取优惠券消息，messageList：{}",list);
            String couponIdsJSON = redisCache.get(RedisKey.PROMOTION_COUPON_ID_LIST);
            List<Long> couponIds = JSON.parseObject(couponIdsJSON, List.class);
            List<SalesPromotionCouponDO> coupons = new ArrayList<>();

            for(Long couponId : couponIds) {
                String couponJSON = redisCache.get(RedisKey.PROMOTION_COUPON_KEY + "::" + couponId);
                SalesPromotionCouponDO coupon = JSON.parseObject(couponJSON, SalesPromotionCouponDO.class);

                Date now = new Date();
                if(now.after(coupon.getActivityStartTime())
                        && now.before(coupon.getActivityEndTime())) {
                    coupons.add(coupon);
                }
            }

            for(MessageExt messageExt : list) {
                log.info("处理消息，message：{}",list);
                // 这个代码就可以拿到一个刚刚登陆成功的用户
                String message = new String(messageExt.getBody());
                UserLoginedEvent userLoginedEvent =
                        JSON.parseObject(message, UserLoginedEvent.class);
                MembershipAccountDTO account = userLoginedEvent.getAccount();
                log.info("用户详情，account：{}",account);
                // 查询redis里所有的优惠券list，针对每个优惠券查询他具体的数据
                // 检查这个优惠券是否有效，还可以继续进行发券，检查当前用户是否发过券了
                // 给他做一次发券的操作就可以了
                for(SalesPromotionCouponDO coupon : coupons) {
                    String receiveCouponFlag = redisCache.get(
                            RedisKey.PROMOTION_USER_RECEIVE_COUPON + "::" + account.getId() + "::" + coupon.getId());
                    if(receiveCouponFlag == null || receiveCouponFlag.equals("")) {
                        SalesPromotionCouponItemDO couponItem = new SalesPromotionCouponItemDO();
                        couponItem.setActivityEndTime(coupon.getActivityEndTime());
                        couponItem.setActivityStartTime(coupon.getActivityStartTime());
                        couponItem.setCouponId(coupon.getId());
                        couponItem.setCouponType(coupon.getCouponType());
                        couponItem.setCreateTime(new Date());
                        couponItem.setCreateUser(account.getId());
                        couponItem.setIsUsed(0);
                        couponItem.setUpdateTime(new Date());
                        couponItem.setUpdateUser(account.getId());
                        couponItem.setUserAccountId(account.getId());
                        couponItemDAO.receiveCoupon(couponItem);

                        redisCache.set(
                                RedisKey.PROMOTION_USER_RECEIVE_COUPON + "::" + account.getId() + "::" + coupon.getId(),
                                "true",
                                -1);
                    }
                }
            }
        } catch(Exception e) {
            log.error("consume error, 用户登录事件处理异常", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

}
